
package com.example.source.debezium;

import io.debezium.embedded.Connect;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import org.apache.kafka.connect.data.Struct;

//@Service
@Slf4j
public class CdcInitService {

    @PostConstruct
    public void init() {
        final Properties props = new Properties();

        props.setProperty("name", "instala-core");
        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
        //偏移量持久化,用来容错
        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
        //偏移量持久化文件路径,默认/tmp/offsets.dat,如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
        String userDir = System.getProperty("user.dir");
        props.setProperty("offset.storage.file.filename", userDir + "offsets.dat");
        //如果连接器重新启动，它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
        props.setProperty("offset.flush.interval.ms", "60000");

        //需要监听的数据库名称
        props.setProperty("database.whitelist", "study");
        //initial(默认) 	连接器执行数据库的初始一致性快照，快照完成后，连接器开始为后续数据库更改流式传输事件记录。
        //initial_only 	连接器只执行数据库的初始一致性快照，不允许捕获任何后续更改的事件。
        //schema_only 	连接器只捕获所有相关表的表结构，不捕获初始数据，但是会同步后续数据库的更改记录
        //schema_only_recovery 	设置此选项可恢复丢失或损坏的数据库历史主题(database.history.kafka.topic)。
        props.setProperty("snapshot.mode", "schema_only");
        //数据库地址
        props.setProperty("database.hostname", "127.0.0.1");
        //数据库端口
        props.setProperty("database.port", "3306");
        //数据库用户名
        props.setProperty("database.user", "root");
        //数据库密码
        props.setProperty("database.password", "root");
        //server.id起到唯一标识作用,随意起
        props.setProperty("database.server.id", "1");
        //server.name起到唯一标识作用,随意起
        props.setProperty("database.server.name", "mysql-connector-java");

        //历史变更记录
        props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
        //历史变更记录存储位置
        props.setProperty("database.history.file.filename", userDir+"dbhistory.dat");

        //格式化日期
        props.setProperty("converters", "dateConverters");
        props.setProperty("dateConverters.type", "com.example.source.debezium.MySqlDateTimeConverter");
        props.setProperty("dateConverters.format.date", "yyyy-MM-dd");
        props.setProperty("dateConverters.format.time", "HH:mm:ss");
        props.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");
        props.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");
        props.setProperty("dateConverters.format.timestamp.zone", "UTC+8");

        //全局读写锁,会影响在线业务,所以跳过锁设置
        props.setProperty("debezium.snapshot.locking.mode", "none");
        //是否包含数据库表结构层面的变更,建议使用默认值true
        props.setProperty("include.schema.changes", "true");

        //指定 BIGINT UNSIGNED 列应如何在更改事件中表示。可能的设置有
        //long使用 Java 的 表示值long，这可能无法提供精确度，但在消费者中易于使用。long通常是首选设置。
        props.setProperty("bigint.unsigned.handling.mode", "long");
        //decimal类型转换为double
        props.setProperty("decimal.handling.mode", "double");

        DebeziumEngine<RecordChangeEvent<SourceRecord>> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(props)
                .notifying(record -> {
                    Struct value = (Struct) record.record().value();
                    if (value == null) return;

                    String op = value.getString("op");
                    Struct before = (Struct) value.get("before");
                    Struct after = (Struct) value.get("after");
                    String table = ((Struct) value.get("source")).getString("table");

                    String sql = null;
                    switch (op) {
                        case "c":
                            sql = SQLGenerator.buildInsertSQL(table, after);
                            break;
                        case "u":
                            sql = SQLGenerator.buildUpdateSQL(table, before, after);
                            break;
                        case "d":
                            sql = SQLGenerator.buildDeleteSQL(table, before);
                            break;
                    }
                    System.out.println("Rebuild SQL: " + sql);
                })
                .build();

        Executors.newSingleThreadExecutor().execute(engine);

    }
}